package com.ysb.config.rabbit;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.Objects;

import static com.ysb.constant.RabbitConstants.*;

/**
 * @author scaffolding
 * @since ${dateTime}
 */
@Configuration
@Slf4j
public abstract class AbstractRabbitConfig {

    /**
     * Choosing a Connection Factory
     * <a href="https://docs.spring.io/spring-amqp/docs/2.3.16/reference/html/#choosing-factory">...</a>
     */
    @Bean
    public ConnectionFactory drugQualityConnectionFactory(RabbitProperties rabbitProperties,
                                                          ThreadPoolTaskExecutor commonExecutor) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(rabbitProperties.getAddresses());
        connectionFactory.setPort(rabbitProperties.getPort());
        connectionFactory.setUsername(rabbitProperties.getUsername());
        connectionFactory.setPassword(rabbitProperties.getPassword());
        // Cache channels - single connection.
        connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
        // The number of channels to maintain in the cache, the default is 25.
        connectionFactory.setChannelCacheSize(MAX_CONSUMERS);
        connectionFactory.setExecutor(commonExecutor);
        // 发送确认及返回
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        connectionFactory.setPublisherReturns(true);
        return connectionFactory;
    }

    @Bean
    public DirectExchange drugQualityExchange() {
        return ExchangeBuilder.directExchange(DRUG_QUALITY_EXCHANGE_NAME).durable(true).build();
    }

    @Bean
    public RabbitTemplate rabbitTemplate(MessageConverter jsonMessageConverter,
                                         ConnectionFactory drugQualityConnectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(drugQualityConnectionFactory);
        rabbitTemplate.setMessageConverter(jsonMessageConverter);
        // Rabbit Publisher Confirms and Returns
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(confirmCallback());
        rabbitTemplate.setReturnCallback(returnCallback());
        // Separate the connections for publishers and consumers to achieve high throughput.
        rabbitTemplate.setUsePublisherConnection(true);
        return rabbitTemplate;
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpAdmin amqpAdmin(ConnectionFactory drugQualityConnectionFactory) {
        return new RabbitAdmin(drugQualityConnectionFactory);
    }

    /**
     * 交换机确认回调
     * 1、交换机收到消息：
     * correlationData：保存回调消息的ID和相关信息
     * ack：true
     * cause：null
     * 2、交换机没收到消息：
     * correlationData 保存回调消息的ID和相关信息
     * ack：false
     * cause：失败的原因
     */
    private RabbitTemplate.ConfirmCallback confirmCallback() {
        return (correlationData, ack, cause) -> {
            if (!ack && Objects.nonNull(correlationData)) {
                log.error("发送消息confirmCallback失败,ID:{},cause:{}", correlationData.getId(), cause);
            }
        };
    }

    /**
     * 消息回退回调
     * 仅在交换机将消息转发给队列/交换机失败时才进行回退
     */
    private RabbitTemplate.ReturnCallback returnCallback() {
        return (message, replyCode, replyText, exchange, routingKey) -> {
            log.error("发送消息returnCallback失败,ID:{},replyCode:{},replyText:{},exchange:{},routingKey:{}", message.getMessageProperties().getHeader(SPRING_RETURNED_MESSAGE_CORRELATION), replyCode, replyText, exchange, routingKey);
        };
    }


}
